/**
* Project Name:KettleUtil
* Date:2016年6月28日
* Copyright (c) 2016, jingma All Rights Reserved.
*/

package cn.benma666.kettle.mytuils;

import cn.benma666.constants.UtilConst;
import cn.benma666.iframe.*;
import cn.benma666.kettle.domain.VJob;
import cn.benma666.kettle.loglistener.FileLoggingEventListener;
import cn.benma666.myutils.StringUtil;
import cn.benma666.sjsj.web.LjqManager;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.util.TypeUtils;
import org.beetl.sql.core.SqlId;
import org.pentaho.di.core.database.util.Db;
import org.pentaho.di.core.logging.KettleLogStore;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;

import java.awt.image.BufferedImage;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * kettle管理器 <br/>
 */
public class KettleManager extends BasicObject {
    /**
     * 启动失败
     */
    public static final String START_FAILED = "StartFailed";
    /**
     * 停止失败
     */
    public static final String STOP_FAILED = "StopFailed";
    /**
    * <资源库_作业id,作业bean>
    */
    public static final Map<String,VJob> JobMap = new ConcurrentHashMap<>();

    /**
    * 启动时初始化，运行之前在运行的作业 <br/>
    * @author jingma
    */
    public static void init(){
        //就是kettle资源库相关的数据载体
        JSONObject zykMap = DictManager.zdMap("KETTLE_GLPT_ZYGL_ZYK");
        slog.info("加载资源库："+zykMap.size());
        Kettle.use(zykMap.keySet().toArray(new String[]{})[0]);
        KettleLogStore.getAppender().addLoggingEventListener(
                new FileLoggingEventListener() );
        for(String zyk:zykMap.keySet()){
            //获取需要初始化运行的作业
            List<JSONObject> list = Db.use(zyk).find(SqlId.of("kee", "selectJob"),
                    Db.buildKeyMap("statusArr",new Object[]{Trans.STRING_RUNNING,
                            Trans.STRING_WAITING,STOP_FAILED},"ddjd",Conf.getAppdm()));
            //更新作业状态为等待中
            Db.use(zyk).update(SqlId.of("kee", "initUpdateJob"),
                    Db.buildKeyMap("statusArr",new Object[]{Trans.STRING_RUNNING,
                            Trans.STRING_WAITING,STOP_FAILED},"ddjd",Conf.getAppdm()));
            //依次启动
            for(final JSONObject job:list){
                try {
                    job.put("zyk",zyk);
                    startJob(job.toJavaObject(VJob.class));
                } catch (Exception e) {
                    slog.error("启动job失败:"+job, e);
                }
            }
            slog.info("{}资源库初始化自动启动作业数：{}",zyk,list.size());
        }
    }

    /**
     * Creates a new instance of GenerateDataBill.
     */
    public KettleManager() {
    }

    /**
    * 移除作业 <br/>
    * @author jingma
    */
    public static void resetJob(VJob jobBean){
        synchronized (JobMap) {
            if(JobMap.containsKey(jobBean.getKey())){
                removeJob(JobMap.get(jobBean.getKey()));
                JobMap.remove(jobBean.getKey());
            }
        }
    }

    /**
    * 移除作业 <br/>
    * @author jingma
    */
    public static VJob removeJob(VJob job) {
        try {
            FileLoggingEventListener.close(job);
        } catch (Exception e) {
            slog.debug("关闭日志失败："+job.getName(),e);
        }
        VJob jobBean = JobMap.get(job.getKey());
        JobMap.remove(job.getKey());
        updateJobStatus(job);
        return jobBean;
    }

    /**
    * 启动作业 <br/>
    * @author jingma
    * @param jobBean 作业bean
    */
    public static String startJob(VJob jobBean) throws Exception {
        synchronized (JobMap) {
            int jobId = jobBean.getId_job();
            if(JobMap.containsKey(jobBean.getKey())){
                return getJobStatus(JobMap.get(jobBean.getKey()));
            }
            Date start = new Date();
            JobMeta jm = Kettle.use(jobBean.getZyk()).loadJob(jobBean.getId_job());
            slog.debug("加载作业总耗时："+(new Date().getTime()-start.getTime())+","+jobBean.getName());
            Map<String, JSONObject> paramMap = Db.use(jobBean.getZyk()).
                    findMap("key","select * from kettle_kz_zycs jp where jp.id_job=?", jobId);
            for(JSONObject param:paramMap.values()){
                //设置参数
                jm.setParameterValue(param.getString("key"),
                        param.getString("value"));
            }
            jm.setLogLevel(LogLevel.getLogLevelForCode(DictManager.zdMcByDm("KETTLE_LOG_LEVEL", jobBean.getRzjb())));
            Job job = new Job(Kettle.use(jobBean.getZyk()).getRepository(), jm);
            job.setLogLevel(jm.getLogLevel());
            job.setName(jobBean.getKey()+" - "+job.getName());
            //启动时设置
            jobBean.setJob(job);
            FileLoggingEventListener.addJobLogFile(jobBean);
            JobMap.put(jobBean.getKey(), jobBean);
            job.start();
            String status = getJobStatus(jobBean);
            //更新作业状态
            VJob vjob = new VJob(jobBean.getZyk(),jobId, status);
            vjob.setDsms(TimingUtil.showTextByJobid(jobBean));
            updateZykz(vjob);
            slog.info("作业启动完成："+jobBean.getName());
            return status;
        }
    }
    /**
    * 停止作业 <br/>
    * @author jingma
    * @param jobBean 作业
    */
    public static String stopJob(VJob jobBean) {
        VJob job = JobMap.get(jobBean.getKey());
        if(job == null){
            return Trans.STRING_STOPPED;
        }
        Kettle.jobStopAll(job.getJob());
        String status = getJobStatus(job);
        slog.info("作业停止完成："+job.getName());
        return status;
    }

    /**
    * 结束作业 <br/>
    * @author jingma
     * @param jobBean 作业
     */
    public static void killJob(VJob jobBean) {
        VJob job = JobMap.get(jobBean.getKey());
        if(job == null){
            return;
        }
        Kettle.jobKillAll(job.getJob());
        slog.info("作业结束完成："+job.getName()+",线程状态："+job.getJob().getState());
    }

    /**
    * 获取作业运行日志 <br/>
    * @author jingma
    * @param key 作业id
    */
    public static Result getLog(String key,int startLineNr) {
        VJob job = JobMap.get(key);
        if(job == null){
            return success("该作业当前未运行。若想查看历史运行日志信息，请到【基础日志】页面查询并下载对应日志文件。");
        }
        int lastLineNr = KettleLogStore.getLastBufferLineNr();
        if(lastLineNr-startLineNr>2000){
            //限制一次最多返回日志行数
            startLineNr = lastLineNr-2000;
        }
        String msg = KettleLogStore.getAppender().getBuffer(
                job.getJob().getLogChannel().getLogChannelId(), false,
                startLineNr , lastLineNr ).toString();
        if(StringUtil.isBlank(msg)&&startLineNr==0){
            return success("这里只能显示最近较短时间的实时运行日志。若想查看历史运行日志信息，请到【基础日志】页面查询并下载对应日志文件。");
        }
        Result r = success(msg);
        r.setData(lastLineNr);
        return r;
    }

    /**
    * 获取作业图 <br/>
    * @author jingma
    */
    public static BufferedImage getJobImg(VJob jobBean) throws Exception {
        VJob job = JobMap.get(jobBean.getKey());
        BufferedImage image;
        if(job == null){
            image = Kettle.generateJobImage(Kettle.use(jobBean.getZyk()).loadJob(jobBean.getId_job()));
        }else{
            image = Kettle.generateJobImage(job.getJob().getJobMeta());
        }
        return image;
    }

    /**
    * 获取转换图 <br/>
    * @author jingma
    */
    public static BufferedImage getTransImg(JSONObject trans) throws Exception {
        TransMeta t = Kettle.use(trans.getString("zyk")).loadTrans(trans.getLongValue("id_transformation"));
        return Kettle.generateTransformationImage(t);
    }

    public String getDefaultConfigInfo() throws Exception {
        JSONObject params = new JSONObject();
//        params.put(WRITE_LOG_FILE, writeLogFile);
        return JSON.toJSONString(params, true);
    }
    /**
     * @return writeLogFile
     */
    public static Boolean isWriteLogFile() {
        return Boolean.valueOf(Conf.getVal("benma666.km.write-log-file","true"));
    }
    /**
     * @return logFileRoot
     */
    public static String getLogFileRoot() {
        return Conf.getVal("logging.file.path");
    }
    /**
     * @return logFileSize
     */
    public static double getLogFileSize() {
        return Double.parseDouble(Conf.getVal("benma666.km.log-file-size","20"));
    }
    /**
    * 根据组件类型名称得到组件的id <br/>
    * @author jingma
    * @param typeName 组件名称
    * @return 组件id
    */
    public static int getJobentryTypeId(VJob job,String typeName){
        return TypeUtils.castToInt(Db.use(job.getZyk()).queryStr(
                "select jt.id_jobentry_type from r_jobentry_type jt where jt.code=?",typeName));
    }

    /**
     * 更新作业扩展
     * @param yobj 更新信息
     * @return 更新结果
     */
    public static Result updateZykz(VJob yobj){
        MyParams zykzJcxx = LjqManager.jcxxByDxdm("KETTLE_GLPT_ZYKZ");
        JSONObject o = yobj.toJSONObject();
        zykzJcxx.put(UtilConst.KEY_YOBJ,o);
        zykzJcxx.set("$.sjdx.dxzt",yobj.getZyk());
        Result r = LjqManager.update(zykzJcxx);
        if(r.getIntValue("$.data.czjls")==0){
            //没有更新到记录，执行插入操作
            r = LjqManager.insert(zykzJcxx);
        }
        return r;
    }

    /**
     * 更新作业状态 <br/>
     * @author jingma
     */
    public static void updateJobStatus(VJob job) {
        VJob vJob = new VJob(job.getZyk(),job.getId_job(),getJobStatus(job));
        KettleManager.updateZykz(vJob);
    }
    /**
     * 获取作业运行状态 <br/>
     * @author jingma
     */
    public static String getJobStatus(VJob job) {
        String status = job.getJob().getStatus();
        if(status.contains("errors")){
            status = STOP_FAILED;
        }
        return status;
    }
}
